//required packages


import java.util.concurrent.LinkedBlockingQueue
import java.util.zip.GZIPInputStream
import net.sf.mzmine.datastructures.RawDataAtNode
import net.sf.mzmine.datastructures.Scan
import java.sql.*

//required for this script to run
@Grapes([
@Grab(group = 'postgresql', module = 'postgresql', version = '8.4-701.jdbc3'),

//local repository!
@Grab(group = 'netcdf.jar', module = 'netcdf.jar', version = 'netcdf.jar'),
@Grab(group = 'MZmine.jar', module = 'MZmine.jar', version = 'MZmine.jar')
])
class GrapeIsNotPerfect {
  //needed to make the anotations work, slightly stupid...  
}

String server = "uranus.fiehnlab.ucdavis.edu"
String user = "netcdf"
String password = "netcdf"
String database = "netcdf-repository2"

//how many threads to we support
int threads = 1 //Runtime.getRuntime().availableProcessors() + 1
//where does our massspec begin
int beginMass = 20
//where does our massspec end
int endMass = 500

//if it's in test mode we drop the database and reduce the data amount...
boolean testMode = false
int testSize = 50

//load class
driver = org.postgresql.Driver

String url = "jdbc:postgresql://${server}:5432/${database}"

//properties for connecting to the database
Properties props = new Properties();
props.setProperty("user", user);
props.setProperty("password", password);

//were our content is stored
File content = null

//check to make sure we have a directory as argument
if (args.size() != 1) {
  println "please provide a directory as first and only argument!"
  System.exit(-1)
}
else {
  String dir = args[0]
  println "using dir: $dir"
  content = new File(dir)

  assert content.isDirectory(), "sorry it has to be a directory"
}

println "max threads: ${threads}"

//if we are in test mode we want to make sure that the database is freshly generated
if (testMode) {
  println ""
  println "execute in test mode, this limits the data insert to ${testSize} spectra a sample"
  println ""

}

Connection main = DriverManager.getConnection(url, props)
main.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED)
main.setAutoCommit(false)

//generate sequences
generateSequences(main)

//generates teh tables
generateTables(main)

//generate the indexes for the database if they are not yet generated
generateIndexes(main)

//generate the partions if they are not yet generated
generatePartition(main, beginMass, endMass)

//tells the server it's running
List<Worker> workers = new Vector<Worker>()

//queue which contains our files to be calculate
LinkedBlockingQueue<File> queue = new LinkedBlockingQueue<File>(threads * 2)

println ""
for (int i = 0; i < threads; i++) {
  print("open session and start worker -> ")
  Worker worker = new Worker()
  worker.workerUrl = url
  worker.workerProps = props

  worker.workerQueue = queue
  worker.stillRunning = true
  worker.workerBeginMass = beginMass
  worker.workerEndMass = endMass
  worker.workerTestMode = testMode
  worker.workerTestSize = testSize

  worker.thread = new Thread(worker)
  worker.thread.start()
  workers.add(worker)
  println("done")
}


println ""
println "working on files"

//go over all files in this directory and import them
content.listFiles().each {File file ->
  String name = file.getName()

  //check if we found a node and intitialized it

  while (!queue.offer(file)) {
    Thread.sleep(1000)
  }
  println "added to queue: ${name}"
}
println "all job's are done, stop the work"
//kill all threads
for (Worker worker: workers) {
  worker.stillRunning = false
  worker.thread.join()

}

println "closing main session"
//close the main session
main.close()

/**
 * imports a file into the database
 */
static void importFile(File file, Connection session, boolean compressed, String name, int beginMass, int endMass, boolean testMode, int testSize) {

  session.autoCommit = false

  Statement statement = session.createStatement()
  //check if we have this file stored and if this is the case we skip it
  ResultSet result = statement.executeQuery("select name from netcdf where name = '${name}'")

  if (result.next()) {
    println "file ${name} is already stored in database, skip it"
    session.commit()
  }

  else {
    //should we delete the file after this run, only needed for temporary files
    boolean cleanup = false

    if (compressed) {

      //save this file to a temporaery directory and delete it
      cleanup = true
      File f = File.createTempFile("netcdf-import", "tmp")
      FileOutputStream out = new FileOutputStream(f)
      InputStream input = new GZIPInputStream(new FileInputStream(file))

      println "uncompressing to file ${f}"
      byte[] b = new byte[4048];
      int read;
      while ((read = input.read(b)) != -1) {
        out.write(b, 0, read);
      }

      out.flush()
      out.close()
      input.close()

      file = f
    }
    else {
      cleanup = false
    }

    println "reading file: ${file}"
    try {
      RawDataAtNode node = new RawDataAtNode(0, file)
      node.setWorkingCopy(file)

      //preload our node into the memory

      print "loading node => "
      node.preLoad()
      node.initializeScanBrowser(0, node.getNumberOfScans())
      println "done"

      //create a new netcdf object
      long sampleId = getNextId(session, "seq_sample")

      Statement insertSample = session.createStatement();
      PreparedStatement insertSpectra = session.prepareStatement("insert into spectra(id,retentiontime,file_id) values (?,?,?)")

      Map<Integer, PreparedStatement> insertTraceStatements = new HashMap<Integer, PreparedStatement>()

      for (int i = beginMass; i <= endMass; i++) {
        insertTraceStatements.put(i, session.prepareStatement("insert into iontrace_${i}(intensity, ion, spectra_id) values (?,?,?)"))
      }

      insertSample.execute("insert into netcdf(id,name) values(${sampleId},'${name}')")

      println "${name} spectra: ${node.getNumberOfScans()}"
      //get all scans of this file and add them to the object
      for (int i = 0; i < node.getNumberOfScans(); i++) {

        //access the next scan of the node
        Scan s = node.getNextScan();

        //get the mz values and intensities
        double[] mz = s.getMZValues()
        double[] intensity = s.getIntensityValues()

        long spectraId = getNextId(session, "seq_spectra")
        int retentionTime = node.getScanTime(s.getScanNumber()) * 1000

        //println "insert spectra ${name} - ${retentionTime}"

        insertSpectra.setLong(1, spectraId)
        insertSpectra.setInt(2, retentionTime)
        insertSpectra.setLong(3, sampleId)

        insertSpectra.addBatch()

        //build the ion trace

        for (int mass = 0; mass < mz.length; mass++) {

          if (mass >= beginMass && mass <= endMass) {
            if (intensity[mass] > 0) {

              int ion = mz[mass]

              PreparedStatement insertTrace = insertTraceStatements[ion]
              insertTrace.setInt(1, (int)intensity[mass])
              insertTrace.setInt(2, ion)
              insertTrace.setLong(3, spectraId)
              insertTrace.addBatch()
            }
          }
        }

        if (testMode) {
          if (i >= testSize) {
            i = node.getNumberOfScans() * 2
          }
        }
      }

      println "executing batch statements"
      //execute all the batch statements for the ions
      for (int i = beginMass; i <= endMass; i++) {
        insertTraceStatements[i].executeBatch()
        insertTraceStatements[i].close()
      }

      //execute the batch update
      insertSpectra.executeBatch()

      //close statements
      insertSpectra.close()
      insertSample.close()

      println "commiting transaction..."

      //commit the transaction
      session.commit()

      //set the node to null to free the memory
      node = null

      //if cleanup, than we have to delete the file, which was generated. Should only apply for compressed files
      if (cleanup) {
        println "cleaning up"
        file.delete()
      }

    }
    catch (Exception e) {
      println "error: ${e.getMessage()}"
      e.printStackTrace()
    }
    println "done with file ${file}"
  }

}

/**
 * generates our table model
 */
static void generateTables(Connection con) {
  Statement statement = con.createStatement()
  ResultSet result = statement.executeQuery("select * from pg_tables where schemaname = 'public' and tablename = 'iontrace'")

  boolean next = result.next()

  result.close()

  if (!next) {
    print "table doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE TABLE iontrace (intensity integer, ion smallint, spectra_id integer)")
    println "created"
  }

  statement = con.createStatement()
  result = statement.executeQuery("select * from pg_tables where schemaname = 'public' and tablename = 'spectra'")

  next = result.next()

  result.close()

  if (!next) {
    print "table doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE TABLE spectra (id integer primary key, retentiontime integer, file_id integer)")
    println "created"
  }


  statement = con.createStatement()
  result = statement.executeQuery("select * from pg_tables where schemaname = 'public' and tablename = 'netcdf'")

  next = result.next()

  result.close()

  if (!next) {
    print "table doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE TABLE netcdf (id bigint primary key, name VARCHAR(100))")
    println "created"
  }

  statement.close()

}
/**
 * generates our table model
 */
static void generateSequences(Connection con) {
  Statement statement = con.createStatement()
  ResultSet result = statement.executeQuery("select * from pg_statio_user_sequences where relname = 'seq_sample'")

  boolean next = result.next()

  result.close()

  if (!next) {
    print "sequence doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE SEQUENCE seq_sample MINVALUE 0 CACHE 50")
    println "created"
  }

  statement = con.createStatement()
  result = statement.executeQuery("select * from pg_statio_user_sequences where relname = 'seq_spectra'")

  next = result.next()

  result.close()

  if (!next) {
    print "sequence doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE SEQUENCE seq_spectra MINVALUE 0 CACHE 20000")
    println "created"
  }


  statement = con.createStatement()
  result = statement.executeQuery("select * from pg_statio_user_sequences where relname = 'seq_trace'")

  next = result.next()

  result.close()

  if (!next) {
    print "sequence doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE SEQUENCE seq_trace MINVALUE 0  CACHE 500")
    println "created"
  }

  statement.close()

}
/**
 * generates the partions for our database
 */
static void generatePartition(Connection con, int beginMass, int endMass) {

  println "generating/checking partitions..."

  //our required conenction

  con.commit()
  for (int i = beginMass; i <= endMass; i++) {
    Statement statement = con.createStatement()
    ResultSet result = statement.executeQuery("select * from pg_tables where schemaname = 'public' and tablename = 'iontrace_${i}'")

    boolean next = result.next()

    result.close()
    statement.close()

    //generates for each ion a table, rule and index 
    if (!next) {
      print "table doesn't exist yet... -> "
      statement = con.createStatement()
      statement.execute("CREATE TABLE iontrace_${i} (CHECK ( ion = ${i} )) INHERITS (iontrace)")
      statement.execute("CREATE INDEX index_iontrace_${i} ON iontrace_${i} (ion)")
 //     statement.execute("CREATE INDEX index_iontrace_${i}_spectra ON iontrace_${i} (spectra_id)")
      statement.execute("CREATE OR REPLACE RULE rule_index_iontrace_${i} AS ON INSERT TO iontrace WHERE ion = ${i} DO INSTEAD INSERT INTO iontrace_${i} VALUES (NEW.intensity, NEW.ion, NEW.spectra_id )")
      statement.close()
      println "created: iontrace_${i}"
    }
  }
  con.commit()
}

/**
 * generates indexes on the database if they don't exist
 */
static void generateIndexes(Connection con) {
  println "generating/checking index..."

  con.commit()

  Statement statement = con.createStatement()
  ResultSet result = statement.executeQuery("select * from pg_indexes where schemaname = 'public' and indexname = 'index_netcdf_name';")

  boolean next = result.next()

  result.close()

  if (!next) {
    print "index on netcdf doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE INDEX index_netcdf_name on netcdf (name)")
    println "created"
  }

  result = statement.executeQuery("select * from pg_indexes where schemaname = 'public' and indexname = 'index_spectra_retentiontime';")

  next = result.next()

  result.close()

  if (!next) {
    print "index on spectra doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE INDEX index_spectra_retentiontime on spectra (retentiontime)")
    println "created"
  }

  result = statement.executeQuery("select * from pg_indexes where schemaname = 'public' and indexname = 'index_spectra_file';")

  next = result.next()

  result.close()

  if (!next) {
    print "index on spectra doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE INDEX index_spectra_file on spectra (file_id)")
    println "created"
  }

  result = statement.executeQuery("select * from pg_indexes where schemaname = 'public' and indexname = 'index_iontrace_ion';")

  next = result.next()

  result.close()

  if (!next) {
    print "index on iontrace doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE INDEX index_iontrace_ion on iontrace (ion)")
    println "created"
  }

  result = statement.executeQuery("select * from pg_indexes where schemaname = 'public' and indexname = 'index_iontrace_spectra';")

  next = result.next()

  result.close()

  if (!next) {
    print "index on iontrace doesn't exist yet... -> "
    statement = con.createStatement()
    statement.execute("CREATE INDEX index_iontrace_spectra on iontrace (spectra_id)")
    println "created"
  }


  statement.close()

  con.commit()
}

static long getNextId(Connection con, String name) {
  Statement state = con.createStatement()
  ResultSet result = state.executeQuery("SELECT nextval('${name}')")
  result.next()

  long value = result.getInt(1)
  result.close()
  state.close()

  return value
}
/**
 * does the actual work of the import
 */
class Worker implements Runnable {

  Connection workerSession
  LinkedBlockingQueue<File> workerQueue
  boolean stillRunning
  Thread thread
  int workerBeginMass
  int workerEndMass
  boolean workerTestMode
  int workerTestSize
  Properties workerProps
  String workerUrl

  public void run() {
    while (stillRunning) {
      if (!workerQueue.isEmpty()) {
        File file = workerQueue.take()
        String name = file.getName()

        boolean compressed = false
        boolean accept = true

        if (name.toLowerCase().endsWith(".cdf.gz")) {
          compressed = true;

          println "compressed cdf file!"
          name = name.replaceAll(".cdf.gz", "")
        }
        else if (name.toLowerCase().endsWith(".cdf")) {
          println "uncompressed cdf file!"
          compressed = false

          name = name.replaceAll(".cdf", "")
        }
        else {
          accept = false
        }

        if (accept) {
          Connection workerSession = DriverManager.getConnection(workerUrl, workerProps)
          NetCdfImportDaemon.importFile(file, workerSession, compressed, name, workerBeginMass, workerEndMass, workerTestMode, workerTestSize)
          println "closing session"
          workerSession.close()
          
          System.gc()
        }
      }
      else {
        Thread.sleep(1000)
      }
    }


  }
}
